/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.seatunnel.connectors.seatunnel.file.sftp.system;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.ChannelSftp.LsEntry;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Vector;

/** SFTP FileSystem. */
public class SFTPFileSystem extends FileSystem {

    public static final Logger LOG = LoggerFactory.getLogger(SFTPFileSystem.class);

    private SFTPConnectionPool connectionPool;
    private URI uri;

    private static final int DEFAULT_SFTP_PORT = 22;
    public static final int DEFAULT_MAX_CONNECTION = 5;
    public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
    public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
    public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user.";
    public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password.";
    public static final String FS_SFTP_HOST = "fs.sftp.host";
    public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port";
    public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile";
    public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max";
    public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported";
    public static final String E_HOST_NULL = "Invalid host specified";
    public static final String E_USER_NULL =
            "No user specified for sftp connection. Expand URI or credential file.";
    public static final String E_PATH_DIR = "Path %s is a directory.";
    public static final String E_FILE_STATUS = "Failed to get file status";
    public static final String E_FILE_NOTFOUND = "File %s does not exist.";
    public static final String E_FILE_EXIST = "File already exists: %s";
    public static final String E_CREATE_DIR = "create(): Mkdirs failed to create: %s";
    public static final String E_DIR_CREATE_FROMFILE =
            "Can't make directory for path %s since it is a file.";
    public static final String E_MAKE_DIR_FORPATH =
            "Can't make directory for path \"%s\" under \"%s\".";
    public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty.";
    public static final String E_FILE_CHECK_FAILED = "File check failed";
    public static final String E_NOT_SUPPORTED = "Not supported";
    public static final String E_SPATH_NOTEXIST = "Source path %s does not exist";
    public static final String E_DPATH_EXIST = "Destination path %s already exist, cannot rename!";
    public static final String E_FAILED_GETHOME = "Failed to get home directory";
    public static final String E_FAILED_DISCONNECT = "Failed to disconnect";

    private void setConfigurationFromURI(URI uriInfo, Configuration conf) throws IOException {

        // get host information from URI
        String host = uriInfo.getHost();
        host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host;
        if (host == null) {
            throw new IOException(E_HOST_NULL);
        }
        conf.set(FS_SFTP_HOST, host);

        int port = uriInfo.getPort();
        port = (port == -1) ? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT) : port;
        conf.setInt(FS_SFTP_HOST_PORT, port);

        // get user/password information from URI
        String userAndPwdFromUri = uriInfo.getUserInfo();
        if (userAndPwdFromUri != null) {
            String[] userPasswdInfo = userAndPwdFromUri.split(":");
            String user = userPasswdInfo[0];
            user = URLDecoder.decode(user, "UTF-8");
            conf.set(FS_SFTP_USER_PREFIX + host, user);
            if (userPasswdInfo.length > 1) {
                conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." + user, userPasswdInfo[1]);
            }
        }

        String user = conf.get(FS_SFTP_USER_PREFIX + host);
        if (user == null || user.equals("")) {
            throw new IllegalStateException(E_USER_NULL);
        }

        int connectionMax = conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION);
        connectionPool = new SFTPConnectionPool(connectionMax, connectionMax);
    }

    private ChannelSftp connect() throws IOException {
        Configuration conf = getConf();

        String host = conf.get(FS_SFTP_HOST, null);
        int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
        String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
        String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null);
        String keyFile = conf.get(FS_SFTP_KEYFILE, null);

        ChannelSftp channel = connectionPool.connect(host, port, user, pwd, keyFile);

        return channel;
    }

    private void disconnect(ChannelSftp channel) throws IOException {
        connectionPool.disconnect(channel);
    }

    private Path makeAbsolute(Path workDir, Path path) {
        if (path.isAbsolute()) {
            return path;
        }
        return new Path(workDir, path);
    }

    private boolean exists(ChannelSftp channel, Path file) throws IOException {
        try {
            getFileStatus(channel, file);
            return true;
        } catch (FileNotFoundException fnfe) {
            LOG.debug("File does not exist: " + file, fnfe);
            return false;
        } catch (IOException ioe) {
            throw new IOException(E_FILE_STATUS, ioe);
        }
    }

    public String quote(String path) {
        byte[] _path = path.getBytes(StandardCharsets.UTF_8);
        int count = 0;
        for (int i = 0; i < _path.length; i++) {
            byte b = _path[i];
            if (b == '\\' || b == '?' || b == '*') {
                count++;
            }
        }
        if (count == 0) {
            return path;
        }
        byte[] _path2 = new byte[_path.length + count];
        for (int i = 0, j = 0; i < _path.length; i++) {
            byte b = _path[i];
            if (b == '\\' || b == '?' || b == '*') {
                _path2[j++] = '\\';
            }
            _path2[j++] = b;
        }
        return new String(_path2, 0, _path2.length, StandardCharsets.UTF_8);
    }

    /**
     * Convenience method, so that we don't open a new connection when using this method from within
     * another method. Otherwise every API invocation incurs the overhead of opening/closing a TCP
     * connection.
     */
    @SuppressWarnings("unchecked")
    private FileStatus getFileStatus(ChannelSftp client, Path file) throws IOException {
        FileStatus fileStat = null;
        Path workDir;
        try {
            workDir = new Path(client.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, file);
        Path parentPath = absolute.getParent();
        if (parentPath == null) { // root directory
            long length = -1; // Length of root directory on server not known
            boolean isDir = true;
            int blockReplication = 1;
            long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
            long modTime = -1; // Modification time of root directory not known.
            Path root = new Path("/");
            return new FileStatus(
                    length,
                    isDir,
                    blockReplication,
                    blockSize,
                    modTime,
                    root.makeQualified(this.getUri(), this.getWorkingDirectory()));
        }
        String pathName = parentPath.toUri().getPath();
        Vector<LsEntry> sftpFiles;
        try {
            sftpFiles = (Vector<LsEntry>) client.ls(pathName);
        } catch (SftpException e) {
            throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
        }
        if (sftpFiles != null) {
            for (LsEntry sftpFile : sftpFiles) {
                if (sftpFile.getFilename().equals(file.getName())) {
                    // file found in directory
                    fileStat = getFileStatus(client, sftpFile, parentPath);
                    break;
                }
            }
            if (fileStat == null) {
                throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
            }
        } else {
            throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
        }
        return fileStat;
    }

    private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile, Path parentPath)
            throws IOException {

        SftpATTRS attr = sftpFile.getAttrs();
        long length = attr.getSize();
        boolean isDir = attr.isDir();
        boolean isLink = attr.isLink();
        if (isLink) {
            String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename();
            try {
                link = channel.realpath(link);

                Path linkParent = new Path("/", link);

                FileStatus fstat = getFileStatus(channel, linkParent);
                isDir = fstat.isDirectory();
                length = fstat.getLen();
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
        int blockReplication = 1;
        // Using default block size since there is no way in SFTP channel to know of
        // block sizes on server. The assumption could be less than ideal.
        long blockSize = DEFAULT_BLOCK_SIZE;
        long modTime = attr.getMTime() * 1000L; // convert to milliseconds
        long accessTime = attr.getATime() * 1000L;
        FsPermission permission = getPermissions(sftpFile);
        // not be able to get the real user group name, just use the user and group
        // id
        String user = Integer.toString(attr.getUId());
        String group = Integer.toString(attr.getGId());
        Path filePath = new Path(parentPath, sftpFile.getFilename());

        return new FileStatus(
                length,
                isDir,
                blockReplication,
                blockSize,
                modTime,
                accessTime,
                permission,
                user,
                group,
                filePath.makeQualified(this.getUri(), this.getWorkingDirectory()));
    }

    private FsPermission getPermissions(LsEntry sftpFile) {
        return new FsPermission((short) sftpFile.getAttrs().getPermissions());
    }

    /**
     * Convenience method, so that we don't open a new connection when using this method from within
     * another method. Otherwise every API invocation incurs the overhead of opening/closing a TCP
     * connection.
     */
    private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
            throws IOException {
        boolean created = true;
        Path workDir;
        try {
            workDir = new Path(client.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, file);
        String pathName = absolute.getName();
        if (!exists(client, absolute)) {
            Path parent = absolute.getParent();
            created = parent == null || mkdirs(client, parent, FsPermission.getDefault());
            if (created) {
                String parentDir = parent.toUri().getPath();
                boolean succeeded = true;
                try {
                    final String previousCwd = client.pwd();
                    client.cd(parentDir);
                    LOG.debug("Creating directory " + pathName);
                    client.mkdir(pathName);
                    client.cd(previousCwd);
                } catch (SftpException e) {
                    throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName, parentDir));
                }
                created = created & succeeded;
            }
        } else if (isFile(client, absolute)) {
            throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute));
        } else {
            LOG.debug("Skipping creation of existing directory " + file);
        }
        if (!created) {
            LOG.debug("Failed to create " + file);
        }
        return created;
    }

    private boolean isFile(ChannelSftp channel, Path file) throws IOException {
        try {
            return !getFileStatus(channel, file).isDirectory();
        } catch (FileNotFoundException e) {
            return false; // file does not exist
        } catch (IOException ioe) {
            throw new IOException(E_FILE_CHECK_FAILED, ioe);
        }
    }

    /**
     * Convenience method, so that we don't open a new connection when using this method from within
     * another method. Otherwise every API invocation incurs the overhead of opening/closing a TCP
     * connection.
     */
    private boolean delete(ChannelSftp channel, Path file, boolean recursive) throws IOException {
        Path workDir;
        try {
            workDir = new Path(channel.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, file);
        String pathName = absolute.toUri().getPath();
        FileStatus fileStat = null;
        try {
            fileStat = getFileStatus(channel, absolute);
        } catch (FileNotFoundException e) {
            // file not found, no need to delete, return true
            return false;
        }
        if (!fileStat.isDirectory()) {
            boolean status = true;
            try {
                channel.rm(pathName);
            } catch (SftpException e) {
                status = false;
            }
            return status;
        } else {
            boolean status = true;
            FileStatus[] dirEntries = listStatus(channel, absolute);
            if (dirEntries != null && dirEntries.length > 0) {
                if (!recursive) {
                    throw new IOException(String.format(E_DIR_NOTEMPTY, file));
                }
                for (int i = 0; i < dirEntries.length; ++i) {
                    delete(channel, new Path(absolute, dirEntries[i].getPath()), recursive);
                }
            }
            try {
                channel.rmdir(pathName);
            } catch (SftpException e) {
                status = false;
            }
            return status;
        }
    }

    /**
     * Convenience method, so that we don't open a new connection when using this method from within
     * another method. Otherwise every API invocation incurs the overhead of opening/closing a TCP
     * connection.
     */
    @SuppressWarnings("unchecked")
    private FileStatus[] listStatus(ChannelSftp client, Path file) throws IOException {
        Path workDir;
        try {
            workDir = new Path(client.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, file);
        FileStatus fileStat = getFileStatus(client, absolute);
        if (!fileStat.isDirectory()) {
            return new FileStatus[] {fileStat};
        }
        Vector<LsEntry> sftpFiles;
        try {
            sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>();
        for (int i = 0; i < sftpFiles.size(); i++) {
            LsEntry entry = sftpFiles.get(i);
            String fname = entry.getFilename();
            // skip current and parent directory, ie. "." and ".."
            if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) {
                fileStats.add(getFileStatus(client, entry, absolute));
            }
        }
        return fileStats.toArray(new FileStatus[fileStats.size()]);
    }

    private boolean rename(ChannelSftp channel, Path src, Path dst) throws IOException {
        Path workDir;
        try {
            workDir = new Path(channel.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absoluteSrc = makeAbsolute(workDir, src);
        Path absoluteDst = makeAbsolute(workDir, dst);

        if (!exists(channel, absoluteSrc)) {
            throw new IOException(String.format(E_SPATH_NOTEXIST, src));
        }
        if (exists(channel, absoluteDst)) {
            throw new IOException(String.format(E_DPATH_EXIST, dst));
        }
        boolean renamed = true;
        try {
            final String previousCwd = channel.pwd();
            channel.cd("/");
            channel.rename(src.toUri().getPath(), dst.toUri().getPath());
            channel.cd(previousCwd);
        } catch (SftpException e) {
            renamed = false;
        }
        return renamed;
    }

    @Override
    public void initialize(URI uriInfo, Configuration conf) throws IOException {
        super.initialize(uriInfo, conf);

        setConfigurationFromURI(uriInfo, conf);
        setConf(conf);
        this.uri = uriInfo;
    }

    @Override
    public String getScheme() {
        return "sftp";
    }

    @Override
    public URI getUri() {
        return uri;
    }

    @Override
    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
        ChannelSftp channel = connect();
        Path workDir;
        try {
            workDir = new Path(channel.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, f);
        FileStatus fileStat = getFileStatus(channel, absolute);
        if (fileStat.isDirectory()) {
            disconnect(channel);
            throw new IOException(String.format(E_PATH_DIR, f));
        }
        InputStream is;
        try {
            // the path could be a symbolic link, so get the real path
            absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));

            is = channel.get(quote(absolute.toUri().getPath()));
        } catch (SftpException e) {
            throw new IOException(e);
        }

        FSDataInputStream fis = new FSDataInputStream(new SFTPInputStream(is, channel, statistics));
        return fis;
    }

    /**
     * A stream obtained via this call must be closed before using other APIs of this class or else
     * the invocation will block.
     */
    @Override
    public FSDataOutputStream create(
            Path f,
            FsPermission permission,
            boolean overwrite,
            int bufferSize,
            short replication,
            long blockSize,
            Progressable progress)
            throws IOException {
        final ChannelSftp client = connect();
        Path workDir;
        try {
            workDir = new Path(client.pwd());
        } catch (SftpException e) {
            throw new IOException(e);
        }
        Path absolute = makeAbsolute(workDir, f);
        if (exists(client, f)) {
            if (overwrite) {
                delete(client, f, false);
            } else {
                disconnect(client);
                throw new IOException(String.format(E_FILE_EXIST, f));
            }
        }
        Path parent = absolute.getParent();
        if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
            parent = (parent == null) ? new Path("/") : parent;
            disconnect(client);
            throw new IOException(String.format(E_CREATE_DIR, parent));
        }
        OutputStream os;
        try {
            final String previousCwd = client.pwd();
            client.cd(parent.toUri().getPath());
            os = client.put(f.getName());
            client.cd(previousCwd);
        } catch (SftpException e) {
            throw new IOException(e);
        }
        FSDataOutputStream fos =
                new FSDataOutputStream(os, statistics) {
                    @Override
                    public void close() throws IOException {
                        super.close();
                        disconnect(client);
                    }
                };

        return fos;
    }

    @Override
    public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
            throws IOException {
        throw new IOException(E_NOT_SUPPORTED);
    }

    /*
     * The parent of source and destination can be different. It is suppose to
     * work like 'move'
     */
    @Override
    public boolean rename(Path src, Path dst) throws IOException {
        ChannelSftp channel = connect();
        try {
            boolean success = rename(channel, src, dst);
            return success;
        } finally {
            disconnect(channel);
        }
    }

    @Override
    public boolean delete(Path f, boolean recursive) throws IOException {
        ChannelSftp channel = connect();
        try {
            boolean success = delete(channel, f, recursive);
            return success;
        } finally {
            disconnect(channel);
        }
    }

    @Override
    public FileStatus[] listStatus(Path f) throws IOException {
        ChannelSftp client = connect();
        try {
            FileStatus[] stats = listStatus(client, f);
            return stats;
        } finally {
            disconnect(client);
        }
    }

    @Override
    public void setWorkingDirectory(Path newDir) {
        // we do not maintain the working directory state
    }

    @Override
    public Path getWorkingDirectory() {
        // Return home directory always since we do not maintain state.
        return getHomeDirectory();
    }

    @Override
    public Path getHomeDirectory() {
        ChannelSftp channel = null;
        try {
            channel = connect();
            Path homeDir = new Path(channel.pwd());
            return homeDir;
        } catch (Exception ioe) {
            return null;
        } finally {
            try {
                disconnect(channel);
            } catch (IOException ioe) {
            }
        }
    }

    @Override
    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
        ChannelSftp client = connect();
        try {
            boolean success = mkdirs(client, f, permission);
            return success;
        } finally {
            disconnect(client);
        }
    }

    @Override
    public FileStatus getFileStatus(Path f) throws IOException {
        ChannelSftp channel = connect();
        try {
            FileStatus status = getFileStatus(channel, f);
            return status;
        } finally {
            disconnect(channel);
        }
    }

    @Override
    public void close() throws IOException {
        super.close();
        connectionPool.shutdown();
    }
}
